{# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
   or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
   https://securityonion.net/license; you may not use this file except in compliance with the
   Elastic License 2.0. #}
{% from 'kafka/map.jinja' import KAFKAMERGED %}
{% from 'vars/globals.map.jinja' import GLOBALS %}

{% set KAFKA_NODES_PILLAR = salt['pillar.get']('kafka:nodes') %}
{% set KAFKA_PASSWORD = salt['pillar.get']('kafka:config:password') %}
{% set KAFKA_TRUSTPASS = salt['pillar.get']('kafka:config:trustpass') %}

{# Create list of KRaft controllers #}
{% set controllers = [] %}

{# Check for Kafka nodes with controller in process_x_roles #}
{% for node in KAFKA_NODES_PILLAR %}
{%   if 'controller' in KAFKA_NODES_PILLAR[node].role %}
{%     do controllers.append(KAFKA_NODES_PILLAR[node].nodeid ~ "@" ~ node ~ ":9093") %}
{%   endif %}
{% endfor %}

{% set kafka_controller_quorum_voters = ','.join(controllers) %}

{# By default all Kafka eligible nodes are given the role of broker, except for
     grid MANAGER (broker,controller) until overridden through SOC UI #}
{% set node_type = salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname + ':role') %}

{# Generate server.properties for 'broker' , 'controller', 'broker,controller' node types
    anything above this line is a configuration needed for ALL Kafka nodes #}
{% if node_type == 'broker' %}
{%   do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' }) %}
{%   do KAFKAMERGED.config.broker.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %}
{%   do KAFKAMERGED.config.broker.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %}
{%   do KAFKAMERGED.config.broker.update({'ssl_x_keystore_x_password': KAFKA_PASSWORD }) %}

{# Nodes with only the 'broker' role need to have the below settings for communicating with controller nodes #}
{%   do KAFKAMERGED.config.broker.update({'controller_x_listener_x_names': KAFKAMERGED.config.controller.controller_x_listener_x_names }) %}
{%   do KAFKAMERGED.config.broker.update({
       'listener_x_security_x_protocol_x_map': KAFKAMERGED.config.broker.listener_x_security_x_protocol_x_map
        + ',' + KAFKAMERGED.config.controller.listener_x_security_x_protocol_x_map })
  %}
{% endif %}

{% if node_type == 'controller' %}
{%   do KAFKAMERGED.config.controller.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %}
{%   do KAFKAMERGED.config.controller.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %}
{%   do KAFKAMERGED.config.controller.update({'ssl_x_keystore_x_password': KAFKA_PASSWORD }) %}

{% endif %}

{# Kafka nodes of this type are not recommended for use outside of development / testing. #}
{% if node_type == 'broker,controller' %}
{%   do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' }) %}
{%   do KAFKAMERGED.config.broker.update({'controller_x_listener_x_names': KAFKAMERGED.config.controller.controller_x_listener_x_names }) %}
{%   do KAFKAMERGED.config.broker.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %}
{%   do KAFKAMERGED.config.broker.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %}
{%   do KAFKAMERGED.config.broker.update({'process_x_roles': 'broker,controller' }) %}
{%   do KAFKAMERGED.config.broker.update({'ssl_x_keystore_x_password': KAFKA_PASSWORD }) %}

{%   do KAFKAMERGED.config.broker.update({
       'listeners': KAFKAMERGED.config.broker.listeners + ',' + KAFKAMERGED.config.controller.listeners })
  %}

{%   do KAFKAMERGED.config.broker.update({
       'listener_x_security_x_protocol_x_map': KAFKAMERGED.config.broker.listener_x_security_x_protocol_x_map
        + ',' + KAFKAMERGED.config.controller.listener_x_security_x_protocol_x_map })
  %}

{% endif %}

{# Truststore config #}
{% do KAFKAMERGED.config.broker.update({'ssl_x_truststore_x_password': KAFKA_TRUSTPASS }) %}
{% do KAFKAMERGED.config.controller.update({'ssl_x_truststore_x_password': KAFKA_TRUSTPASS }) %}
{% do KAFKAMERGED.config.client.update({'ssl_x_truststore_x_password': KAFKA_TRUSTPASS }) %}

{# Client properties stuff #}
{% do KAFKAMERGED.config.client.update({'ssl_x_keystore_x_password': KAFKA_PASSWORD }) %}

{% if 'broker' in node_type %}
{%   set KAFKACONFIG = KAFKAMERGED.config.broker %}
{% else %}
{%   set KAFKACONFIG = KAFKAMERGED.config.controller %}
{% endif %}

{% set KAFKACLIENT = KAFKAMERGED.config.client %}